home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from __future__ import with_statement
- import socket
- import threading
- import logging
- log = logging.getLogger('msn.sock')
- from collections import defaultdict
- from threading import RLock
- import common
- from util import callsback, lock, default_timer, to_hex, get
- from util.Events import EventMixin, event
- from Msnifier import Msnifier
- from MSNCommands import CommandProcessor, Message
- import msn
-
- dummy = lambda *a, **k: pass
-
- def trid(max = 2147483647, i = 0):
- while True:
- i += 1
- yield i
- if i == max:
- i = 0
- continue
-
-
- class MSNSocketBase(EventMixin):
- events = EventMixin.events | set(('on_connect', 'on_send', 'on_conn_error', 'on_close', 'on_message'))
- delim = '\r\n'
- payload_commands = 'MSG UUX UBX PAG IPG NOT GCF ADL UUN UBN RML FQY 241 508 UBM UUM'.split()
-
- def __init__(self):
- EventMixin.__init__(self)
- self.trid = trid()
- self.callbacks = defaultdict(list)
- if not hasattr(self, '_lock'):
- self._lock = threading.RLock()
-
-
-
- def set_trid(self, msgobj, trid):
- if trid is True:
- msgobj.trid = self.trid.next()
-
-
-
- def set_callbacks(self, msgobj, callback):
- if callback is sentinel:
- callback = None
-
- if msgobj.is_trid:
- self.callbacks[msgobj.trid].append(callback)
- else:
- self.callbacks[msgobj.cmd].append(callback)
-
-
- def pause(self):
- pass
-
-
- def unpause(self):
- pass
-
-
- def on_connect(self):
- return self
-
- on_connect = event(on_connect)
-
- def on_send(self, data):
- pass
-
- on_send = event(on_send)
-
- def on_conn_error(self, e = None):
- log.info('%r had a connection error: %r', self, e)
- return (self, e)
-
- on_conn_error = event(on_conn_error)
-
- def on_close(self):
- return self
-
- on_close = event(on_close)
-
- def on_message(self, msg):
- self.event('on_message', msg)
- self._lock.__enter__()
-
- try:
- callback = None
- if msg.cmd == 'QNG':
- msg.cmd = 'PNG'
- msg.trid = 0
-
-
- try:
- if not msg.trid:
- pass
- callback = self.callbacks[msg.cmd][0]
- except (KeyError, IndexError):
- e = None
- pop = False
-
- pop = True
- if callback is None:
- return None
-
- if pop:
- if msg.is_trid:
- for i in range(msg.trid):
-
- try:
- self.callbacks.pop(i)
- continue
- except (IndexError, KeyError):
- continue
-
-
-
- elif not msg.trid:
- self.callbacks[msg.cmd].pop(0)
-
- finally:
- pass
-
-
- try:
- if msg.is_error:
- f = callback.error
- else:
- f = callback.success
- except AttributeError:
- self._lock
- e = self._lock
- log.error('AttributeError in msnsocket.on_message: %r\ncallback was: %r', e, callback)
- except:
- self._lock
-
- log.debug('MSNSocket calling %r', f)
-
- try:
- f(self, msg)
- except Exception:
- self._lock
- e = self._lock
- log.error('Error in callback')
- import traceback
- traceback.print_exc()
- import inspect
- print inspect.getsource(f)
- except:
- self._lock
-
-
-
-
- class MSNSocket(MSNSocketBase, common.socket):
- speed_limit = 2000
- speed_window = 0.25
-
- def __init__(self):
- common.socket.__init__(self)
- MSNSocketBase.__init__(self)
- self.set_terminator(self.delim)
- self.data = ''
- self.expecting = 'command'
- self._server = None
- self.rater = Msnifier(self)
- self.rater.start()
- self._bc_lock = RLock()
- self.bytecount = [
- (0, default_timer())]
- log.debug('%r created', self)
-
-
- def get_local_sockname(self):
- return self.socket.getsockname()
-
-
- def connect_args_for(self, type, addr):
- return (addr,)
-
-
- def connect(self, host_port):
-
- try:
- (host, port) = host_port
- except (ValueError, TypeError):
- raise TypeError("%r address must be <type 'tuple'> (host, port) not %r (%r)", type(self).__name__, type(host_port), host_port)
-
- if self._server is not None:
- raise ValueError("Don't know which server to use! self._server = %r, host_port = %r.", self._server, host_port)
-
- self._server = host_port
- log.info('connecting socket to %r', self._server)
-
- try:
- common.socket.connect(self, self._server, error = self.on_conn_error)
- except Exception:
- e = None
- self.on_conn_error(e)
- return None
-
- self.bind_event('on_message', (lambda msg: log.debug('Received %r', msg)))
-
- _connect = connect
-
- def _disconnect(self):
- self.close_when_done()
-
-
- def _closed(self):
- return not getattr(self.socket, 'connected', False)
-
- _closed = property(_closed)
-
- def __repr__(self):
-
- try:
- s = 'connected to %r' % (self.socket.getpeername(),)
- except socket.error:
- s = 'not connected'
-
- return '<%s %s>' % (type(self).__name__, s)
-
-
- def test_connection(self, callback = None):
- self.send(Message('PNG'), callback = callback)
-
- test_connection = callsback(test_connection)
-
- def handle_connect(self):
- log.debug('connection established')
- self.on_connect()
-
-
- def handle_expt(self):
- log.warning('OOB data. self.data = %r', self.data)
- self.close()
-
-
- def collect_incoming_data(self, data):
- self.data += data
-
- collect_incoming_data = lock(collect_incoming_data)
-
- def set_terminator(self, term):
- common.socket.set_terminator(self, term)
-
-
- def found_terminator(self):
- self.data += self.delim
-
- try:
- self._lock.__enter__()
-
- try:
- self.data = ''
- data = self.data
- log.debug_s('IN : %r', data)
- dlist = data.split(' ')
- cmd = dlist[0]
- if self.expecting == 'command' and dlist[0] in self.payload_commands:
- self.expecting = 'payload'
- self.data = data
-
- try:
- new_term = int(dlist[-1])
- except ValueError:
- self._lock
- self._lock
- self
- new_term = 0
- except:
- self._lock
-
- return self.set_terminator(new_term)
- elif self.expecting == 'payload':
- self.expecting = 'command'
- data = data[:-len(self.delim)]
- payload = True
- else:
- payload = False
- self.set_terminator(self.delim)
- msg = Message.from_net(data, payload)
- finally:
- pass
-
- except Exception:
- self
- e = self
- log.info('error parsing message, testing connection\nError was %r', e)
- self.test_connection(success = self.conn_ok, error = self.conn_error)
- import traceback
- traceback.print_exc()
- except:
- self
-
- self.on_message(msg)
-
-
- def handle_close(self):
- log.warning('socket closed, self.data = %r', self.data)
- self.rater.stop()
- self.close()
-
-
- def close(self):
- log.warning('socket closing, self.data = %r', self.data)
- common.socket.close(self)
- self.on_close()
-
-
- def send_gen(self, gen, priority = 5):
- self.rater.send_pkt(gen, priority)
-
-
- def send(self, msgobj, trid = sentinel, callback = None, **kw):
- self.set_trid(msgobj, trid)
- log.debug('Sending %r', msgobj)
- self.rater.send_pkt(str(msgobj), **kw)
- self.set_callbacks(msgobj, callback)
-
- send = callsback(send)
-
- def conn_ok(self):
- log.info('connection test passed')
-
-
- def conn_error(self):
- log.warning('connection test failed')
- self.close_when_done()
- self.on_conn_error()
-
-
- def _send(self, data, *a, **k):
- log.log_s(0, 'sent: %s' % data)
- self._lock.__enter__()
-
- try:
- log.debug_s('OUT : %r %r %r' % (data, a, k))
- if not common.socket.send(self, data, *a, **k):
- log.critical('Message dropped in MSNSocket: <%s>' % data)
- finally:
- pass
-
- self.on_send(data)
- now = default_timer()
- self._bc_lock.__enter__()
-
- try:
- self.bytecount.append((len(data), now))
- finally:
- pass
-
-
-
- def time_to_send(self, data):
- now = default_timer()
- self._bc_lock.__enter__()
-
- try:
- self.bytecount = (self._bc_lock, filter)((lambda t: now - t[1] < self.speed_window), self.bytecount)
- finally:
- pass
-
- send_rate = sum((lambda .0: for b in .0:
- b[0])(self.bytecount))
- if send_rate < self.speed_limit:
- return 0
-
- log.debug('sending too fast')
- bytes = dlen = len(data)
- for size, tstamp in reversed(self.bytecount):
- bytes += size
- interval = now - tstamp
- if (bytes / interval) * self.speed_window > self.speed_limit:
- break
- continue
-
- tts = (bytes / self.speed_limit) * self.speed_window + interval
- log.log(5, 'currently sending at %d bytes/sec', send_rate)
- log.debug('sleeping for %r seconds' % tts)
- return tts
-
-
- def close_when_done(self):
- self.send(Message('OUT'))
- self.rater.stop()
- common.socket.close_when_done(self)
-
-
-